Distributed Coordination with ZooKeeper: Leader Election and Service Discovery in Go
February 2, 2026Distributed Coordination with ZooKeeper: Leader Election and Service Discovery in Go
In previous posts, we built the core components of our distributed search system: TF-IDF ranking, HTTP networking, search workers, and a coordinator. But how do we decide which node becomes the coordinator? What happens when a node crashes?
Enter Apache ZooKeeper — a distributed coordination service that solves these problems elegantly.
The Coordination Problem
In a distributed system, we face several challenges:
- Leader Election: Only one node should be the coordinator at any time
- Failure Detection: We need to know when nodes crash
- Service Discovery: Workers need to find the coordinator, and vice versa
- Consistency: All nodes must agree on who the leader is
ZooKeeper provides primitives that make solving these problems straightforward.
ZooKeeper Fundamentals
Znodes
ZooKeeper stores data in a hierarchical namespace, similar to a filesystem. Each node is called a znode. There are two special types:
- Ephemeral znodes: Automatically deleted when the session that created them ends (perfect for failure detection)
- Sequential znodes: Have a monotonically increasing counter appended to their name (perfect for ordering)
Our Namespace Structure
/├── election/│ ├── c_0000000001 (ephemeral sequential - Node A)│ ├── c_0000000002 (ephemeral sequential - Node B)│ └── c_0000000003 (ephemeral sequential - Node C)├── workers_service_registry/│ ├── n_0000000001 (ephemeral sequential - "http://worker1:8081/task")│ └── n_0000000002 (ephemeral sequential - "http://worker2:8082/task")└── coordinators_service_registry/ └── n_0000000001 (ephemeral sequential - "http://coordinator:8080/search")
Leader Election Algorithm
The election algorithm is beautifully simple:
- Each node creates an ephemeral sequential znode under
/election - Get all children and sort them lexicographically
- If your znode is the smallest, you're the leader
- Otherwise, watch the znode immediately before yours
- When that znode is deleted, repeat from step 2
This is called the "herd effect" avoidance pattern — only one node wakes up when the leader dies, not all of them.
Implementation
package cluster
const ( ElectionPath = "/election" ElectionPrefix = "c_")
type LeaderElection struct { conn *zk.Conn callback OnElectionCallback currentZnodeName string mu sync.Mutex}
// OnElectionCallback defines what happens when role changestype OnElectionCallback interface { OnElectedToBeLeader() OnWorker()}
Volunteering for Leadership
When a node starts, it creates an ephemeral sequential znode:
func (le *LeaderElection) VolunteerForLeadership() error { le.mu.Lock() defer le.mu.Unlock()
znodePath := ElectionPath + "/" + ElectionPrefix createdPath, err := le.conn.CreateProtectedEphemeralSequential( znodePath, []byte{}, zk.WorldACL(zk.PermAll), ) if err != nil { return err }
// Extract just the znode name from the full path le.currentZnodeName = createdPath[len(ElectionPath)+1:] log.Printf("Volunteered for leadership with znode: %s", le.currentZnodeName)
return nil}
The CreateProtectedEphemeralSequential method is crucial — it handles the case where the connection drops during creation, preventing orphaned znodes.
Running the Election
func (le *LeaderElection) reelectLeaderInternal() error { // Get all children of the election znode children, _, err := le.conn.Children(ElectionPath) if err != nil { return err }
// Sort children lexicographically sort.Strings(children)
// Find our position ourIndex := -1 for i, child := range children { if child == le.currentZnodeName { ourIndex = i break } }
// If we're the smallest (first), we're the leader if ourIndex == 0 { log.Printf("Elected as leader with znode: %s", le.currentZnodeName) le.callback.OnElectedToBeLeader() return nil }
// Otherwise, we're a worker - watch our predecessor predecessorName := children[ourIndex-1] predecessorPath := ElectionPath + "/" + predecessorName
log.Printf("Not leader. Watching predecessor: %s", predecessorName) le.callback.OnWorker()
// Set up watch on predecessor go le.watchPredecessor(predecessorPath)
return nil}
Watching the Predecessor
func (le *LeaderElection) watchPredecessor(predecessorPath string) { for { exists, _, eventChan, err := le.conn.ExistsW(predecessorPath) if err != nil { log.Printf("Error watching predecessor: %v", err) return }
// Handle race condition: predecessor might have disappeared if !exists { log.Printf("Predecessor no longer exists, triggering re-election") le.ReelectLeader() return }
// Wait for an event event := <-eventChan if event.Type == zk.EventNodeDeleted { log.Printf("Predecessor deleted, triggering re-election") le.ReelectLeader() return } }}
Service Registry
The service registry allows nodes to discover each other. Workers register their /task endpoint, and the coordinator registers its /search endpoint.
Implementation
type ZkServiceRegistry struct { conn *zk.Conn registryPath string currentZnode string allServiceAddresses []string mu sync.RWMutex}
const ( WorkersRegistryPath = "/workers_service_registry" CoordinatorsRegistryPath = "/coordinators_service_registry")
Registering to the Cluster
func (sr *ZkServiceRegistry) RegisterToCluster(address string) error { sr.mu.Lock() defer sr.mu.Unlock()
// Prevent duplicate registration if sr.currentZnode != "" { log.Printf("Warning: Already registered, skipping") return nil }
// Create ephemeral sequential znode with address as data znodePath := sr.registryPath + "/n_" createdPath, err := sr.conn.CreateProtectedEphemeralSequential( znodePath, []byte(address), zk.WorldACL(zk.PermAll), ) if err != nil { return err }
sr.currentZnode = createdPath log.Printf("Registered at %s with address %s", createdPath, address)
return nil}
Discovering Services
func (sr *ZkServiceRegistry) GetAllServiceAddresses() ([]string, error) { children, _, err := sr.conn.Children(sr.registryPath) if err != nil { return nil, err }
sort.Strings(children)
addresses := make([]string, 0, len(children)) for _, child := range children { childPath := sr.registryPath + "/" + child data, _, err := sr.conn.Get(childPath) if err != nil { if err == zk.ErrNoNode { continue // Node was deleted, skip } return nil, err } if len(data) > 0 { addresses = append(addresses, string(data)) } }
return addresses, nil}
Watching for Changes
The coordinator needs to know when workers join or leave:
func (sr *ZkServiceRegistry) RegisterForUpdates() { go sr.watchForUpdates()}
func (sr *ZkServiceRegistry) watchForUpdates() { for { children, _, eventChan, err := sr.conn.ChildrenW(sr.registryPath) if err != nil { log.Printf("Error watching registry: %v", err) return }
// Update cached addresses sr.updateCachedAddresses(children)
// Wait for changes event := <-eventChan if event.Type == zk.EventNodeChildrenChanged { log.Printf("Registry changed, refreshing addresses") } }}
Failure Scenarios
Let's trace through what happens in various failure scenarios:
Scenario 1: Worker Crashes
- Worker's ZooKeeper session ends
- Ephemeral znode in
/workers_service_registryis deleted - Coordinator's watch fires
- Coordinator refreshes worker list
- Future tasks are distributed to remaining workers
Scenario 2: Coordinator Crashes
- Coordinator's ZooKeeper session ends
- Ephemeral znode in
/electionis deleted - Next worker's watch fires (the one watching the coordinator)
- That worker runs re-election
- It finds itself as the smallest znode
- It becomes the new coordinator
- It unregisters from workers, registers as coordinator
Scenario 3: Network Partition
- Node loses connection to ZooKeeper
- Session timeout expires
- All ephemeral znodes for that node are deleted
- Other nodes detect the change via watches
- When connection is restored, node re-registers
Testing the Election Logic
We use unit tests to verify the sorting and callback logic:
func TestSortZnodeNames(t *testing.T) { tests := []struct { name string input []string expected []string }{ { name: "reverse order", input: []string{"c_0000000003", "c_0000000002", "c_0000000001"}, expected: []string{"c_0000000001", "c_0000000002", "c_0000000003"}, }, }
for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := SortZnodeNames(tt.input) // ... verify result matches expected }) }}
// Mock callback for testingtype MockCallback struct { LeaderCalled bool WorkerCalled bool}
func (m *MockCallback) OnElectedToBeLeader() { m.LeaderCalled = true}
func (m *MockCallback) OnWorker() { m.WorkerCalled = true}
The Complete Picture
Here's how all the pieces fit together:
┌─────────────────────────────────────────────────────────────┐│ ZooKeeper Cluster ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││ │ /election │ │ /workers │ │/coordinators│ ││ │ c_001 ←────┼──┼─────────────┼──┼─────────────┤ ││ │ c_002 │ │ n_001 │ │ n_001 │ ││ │ c_003 │ │ n_002 │ │ │ ││ └─────────────┘ └─────────────┘ └─────────────┘ │└─────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │ Node A │ │ Node B │ │ Node C │ │ (Leader)│ │ (Worker)│ │ (Worker)│ │Coordinator │SearchWorker │SearchWorker └─────────┘ └─────────┘ └─────────┘
Key Takeaways
- Ephemeral znodes = automatic failure detection: When a node dies, its znodes disappear
- Sequential znodes = ordering: Perfect for leader election
- Watches = reactive updates: No polling needed
- Herd effect avoidance: Only watch your predecessor, not the leader
- Protected creates: Handle connection drops during znode creation
What's Next?
In the next post, we'll implement the OnElectionAction component that ties everything together — handling role transitions when a node becomes a leader or worker.
Get the Code
git clone git@github.com:UnplugCharger/distributed_doc_search.gitgit checkout 05-zookeeper-clustercd distributed-search-cluster-gogo test ./internal/cluster/... -v
This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.